package com360.etc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class IndexStepOne {
    public static class IndexStepOneMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        //产生<hello-文件名，1>
        @Override
        protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,IntWritable>.Context context)
            throws IOException,InterruptedException{
            //从输入切片信息获取正在处理的一行数据所属的文件
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            String fileName = inputSplit.getPath().getName();

            String[] words = value.toString().split(" ");
            for (String w :words){
                //将单词-文件名作为key ， 1 作为value 输出
                context.write(new Text(w + "-" + fileName),new IntWritable(1));
            }
        }
    }
    public static class IndexStepOneReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key,Iterable<IntWritable> values,
                              Reducer<Text,IntWritable,Text,IntWritable>.Context context)throws IOException,InterruptedException{
            int count = 0;
            for (IntWritable value : values){
                count +=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(IndexStepOne.class);

        job.setMapperClass(IndexStepOneMapper.class);
        job.setReducerClass(IndexStepOneReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path("F:\\on"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\ddd"));

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);


    }
}
